﻿/**************************************************************
 *
 * 唯一标识：51543fe7-d1fb-48dd-835a-55cebc82ee26
 * 命名空间：Xunit.Sgr.Cap
 * 创建时间：2024/7/22 16:26:57
 * 机器名称：DESKTOP-HJ4OAG9
 * 创建者：CocoYuan
 * 电子邮箱：fengqinhua2016@163.com
 * 描述：
 *
 **************************************************************/

using DotNetCore.CAP;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json.Linq;
using Sgr;
using Sgr.Cap;
using Sgr.Trackers;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Xunit.Sgr.Cap.HelperClass;

namespace Xunit.Sgr.Cap
{
    public class CustomTestSubscriber : ICapSubscribe
    {
        private readonly MessageCollector _collector;
        private readonly ILogger<CustomTestSubscriber> _logger;
        private readonly IMessageMark _messageMark;

        public CustomTestSubscriber(MessageCollector collector, IMessageMark messageMark, ILogger<CustomTestSubscriber> logger)
        {
            _logger = logger;
            _collector = collector;
            _messageMark = messageMark;
        }

        [CapSubscribe("sgr.custom.message")]
        public void Handle(string message)
        {
            _logger.LogWarning($"{nameof(Handle)} method called. {message}");
            _collector.Add(message);
        }

        [CapSubscribe("sgr.header.message")]
        public void Handle2(string message, [FromCap] CapHeader header)
        {
            string value = $"{message}-{header["my.header.first"]}-{header["my.header.second"]}";

            _logger.LogWarning($"{nameof(Handle)} method called. {value}");
            _collector.Add(value);
        }

        [CapSubscribe("sgr.mark.message")]
        public async Task Handle3(string message, [FromCap] CapHeader header)
        {
            /**************************************************
             * 幂等处理的步骤梳理：
             *
             * 1. 获取消息的唯一标识MsgId
             * 2. 借助Redis中间件验证MsgId对应的消息是否已处理。如果已处理，则直接返回。如果未处理，则使用MsgId SET NX 指令将其写入Redis，标识 MsgId 已被处理。（注意，需保证整个读写过程的原子性；需设置合理的过期时间）
             * 3. 执行业务逻辑，如果业务逻辑执行成功，则直接返回；如果业务逻辑执行失败，则需要有效的处理，避免消息丢失，可能从处理方式包括有：
             *  3.1 直接返回
             *  3.2 抛出自定义异常，使得CAP在本地消息表中记录当前消息的异常，并重试。重试时，在步骤2中增加判断，看当前消息是否已处理业务逻辑发生异常而重试，如果是则步骤2的验证逻辑，直接再次执行业务逻辑。
             *  3.3 包装当前消息，重新发布一个新的消息，在新消息中重试业务逻辑。
             *  3.4 清理步骤2中设置MsgId已处理的状态，并重试。
             *
             * 注意：此处幂等处理未借助数据库来实现，而是采用Redis作为中间件来进行技术实现。原因就是，有些消费者的业务逻辑中没有数据库操作，无法借助数据库的事务来实现幂等处理时的数据一致性。此时，可以考虑借助Redis来实现
             *
             **************************************************/

            //此处，唯一标识应该以业务的业务主键作为标识更为合理，以应对生产者的重发。不能以消息的MessageId，在生产者因为某些原因手动重发（例如上游针对一个交易重复请求了）的场景下起不到去重/幂等的效果（因消息id不同）。
            if (header.IsMessageExceptionRetry() || await _messageMark.MarkAsProcessedAsync(header.GetMessageId()))
            {
                try
                {
                    if (header.IsMessageExceptionRetry())
                    {
                        _logger.LogWarning($"[sgr.mark.message]处理消息因异常而重试...");

                        _logger.LogWarning($"{nameof(Handle)} method called. {message}");
                        _collector.Add(message);
                    }
                    else
                        throw new Exception("自定义错误...");
                }
                catch (Exception ex)
                {
                    _logger.LogWarning($"[sgr.mark.message]处理消息时发生了异常...");
                    //header.MessageExceptionRetry();
                    throw new CapMessageExceptionRetryException(ex);
                }
            }
            else
                _logger.LogWarning($"[sgr.mark.message]消息已处理过...");
        }
    }
}